feat: reorder row groups by statistics during sort pushdown#21580
feat: reorder row groups by statistics during sort pushdown#21580zhuqi-lucas wants to merge 4 commits intoapache:mainfrom
Conversation
When sort pushdown is active, reorder row groups within each file by their min/max statistics to match the requested sort order. This helps TopK queries find optimal values first via dynamic filter pushdown. - Add reorder_by_statistics to PreparedAccessPlan that sorts row_group_indexes by the first sort column's min values - Pass sort order from ParquetSource::try_pushdown_sort through to the opener via sort_order_for_reorder field - Reorder happens after pruning but before reverse (they compose) - Gracefully skips reorder when statistics unavailable, sort expr is not a simple column, row_selection present, or <=1 row groups Closes apache#21317
3700464 to
a013bf6
Compare
|
run benchmarks |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing feat/reorder-row-groups-by-stats (a013bf6) to 29c5dd5 (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing feat/reorder-row-groups-by-stats (a013bf6) to 29c5dd5 (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing feat/reorder-row-groups-by-stats (a013bf6) to 29c5dd5 (merge-base) diff using: tpcds File an issue against this benchmark runner |
There was a problem hiding this comment.
Pull request overview
Note
Copilot was unable to run its full agentic suite in this review.
This PR improves TopK performance for Parquet scans when sort pushdown is Inexact by enabling row-group reordering based on statistics, so likely “best” row groups are read earlier and dynamic filters can tighten sooner.
Changes:
- Thread an optional
LexOrderingfromParquetSource::try_pushdown_sortthroughParquetMorselizerto the access-plan preparation step. - Add
PreparedAccessPlan::reorder_by_statisticsto reorderrow_group_indexesusing Parquet statistics. - Add unit tests covering reorder/skip behavior for multiple edge cases.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 5 comments.
| File | Description |
|---|---|
| datafusion/datasource-parquet/src/source.rs | Plumbs sort ordering into the file source for later row-group reordering. |
| datafusion/datasource-parquet/src/opener.rs | Carries optional sort ordering into the opener and applies reorder_by_statistics during plan preparation. |
| datafusion/datasource-parquet/src/access_plan.rs | Implements row-group reordering by statistics and adds focused unit tests. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| let sort_order = LexOrdering::new(order.iter().cloned()); | ||
| let mut new_source = self.clone().with_reverse_row_groups(true); | ||
| new_source.sort_order_for_reorder = sort_order; |
There was a problem hiding this comment.
LexOrdering::new(...) appears to return a Result<LexOrdering, _> (as used with .unwrap() in the new unit tests), but here it’s assigned directly without ?/unwrap, and then assigned to sort_order_for_reorder: Option<LexOrdering> without wrapping in Some(...). This should be changed to construct a LexOrdering with error propagation and store it as Some(sort_order) (or skip setting the field on error). Otherwise this won’t compile.
| let sort_order = LexOrdering::new(order.iter().cloned()); | |
| let mut new_source = self.clone().with_reverse_row_groups(true); | |
| new_source.sort_order_for_reorder = sort_order; | |
| let sort_order = LexOrdering::new(order.iter().cloned())?; | |
| let mut new_source = self.clone().with_reverse_row_groups(true); | |
| new_source.sort_order_for_reorder = Some(sort_order); |
| // LexOrdering is guaranteed non-empty, so first() returns &PhysicalSortExpr | ||
| let first_sort_expr = sort_order.first(); |
There was a problem hiding this comment.
sort_order.first() (if LexOrdering is Vec-like) returns Option<&PhysicalSortExpr>, but the code uses it as if it were &PhysicalSortExpr (first_sort_expr.expr...). This is likely a compile error. A concrete fix is to obtain the first element via iteration and handle the empty case (e.g., early-return Ok(self) if no sort expressions), then use the returned &PhysicalSortExpr.
| // LexOrdering is guaranteed non-empty, so first() returns &PhysicalSortExpr | |
| let first_sort_expr = sort_order.first(); | |
| let first_sort_expr = match sort_order.iter().next() { | |
| Some(expr) => expr, | |
| None => { | |
| debug!("Skipping RG reorder: empty sort order"); | |
| return Ok(self); | |
| } | |
| }; |
| } | ||
| }; | ||
|
|
||
| let descending = first_sort_expr.options.descending; |
There was a problem hiding this comment.
For DESC ordering, reordering by min values is often a poor proxy for “row group likely contains the largest values first”; typically you want to sort by max when descending == true (and by min when ascending). This can significantly reduce the intended TopK benefit (and can even choose a worse first row group when ranges overlap). Consider switching to row_group_maxs(...) for descending order, and update the doc comment (currently mentions “min/max”) and the DESC unit test accordingly.
| // Get min values for the selected row groups | ||
| let rg_metadata: Vec<&RowGroupMetaData> = self | ||
| .row_group_indexes | ||
| .iter() | ||
| .map(|&idx| file_metadata.row_group(idx)) | ||
| .collect(); | ||
|
|
||
| let min_values = match converter.row_group_mins(rg_metadata.iter().copied()) { | ||
| Ok(vals) => vals, | ||
| Err(e) => { | ||
| debug!("Skipping RG reorder: cannot get min values: {e}"); | ||
| return Ok(self); | ||
| } | ||
| }; | ||
|
|
||
| // Sort indices by min values | ||
| let sort_options = arrow::compute::SortOptions { | ||
| descending, | ||
| nulls_first: first_sort_expr.options.nulls_first, | ||
| }; | ||
| let sorted_indices = match arrow::compute::sort_to_indices( | ||
| &min_values, |
There was a problem hiding this comment.
For DESC ordering, reordering by min values is often a poor proxy for “row group likely contains the largest values first”; typically you want to sort by max when descending == true (and by min when ascending). This can significantly reduce the intended TopK benefit (and can even choose a worse first row group when ranges overlap). Consider switching to row_group_maxs(...) for descending order, and update the doc comment (currently mentions “min/max”) and the DESC unit test accordingly.
| // Get min values for the selected row groups | |
| let rg_metadata: Vec<&RowGroupMetaData> = self | |
| .row_group_indexes | |
| .iter() | |
| .map(|&idx| file_metadata.row_group(idx)) | |
| .collect(); | |
| let min_values = match converter.row_group_mins(rg_metadata.iter().copied()) { | |
| Ok(vals) => vals, | |
| Err(e) => { | |
| debug!("Skipping RG reorder: cannot get min values: {e}"); | |
| return Ok(self); | |
| } | |
| }; | |
| // Sort indices by min values | |
| let sort_options = arrow::compute::SortOptions { | |
| descending, | |
| nulls_first: first_sort_expr.options.nulls_first, | |
| }; | |
| let sorted_indices = match arrow::compute::sort_to_indices( | |
| &min_values, | |
| // Get values for the selected row groups: mins for ASC, maxs for DESC | |
| let rg_metadata: Vec<&RowGroupMetaData> = self | |
| .row_group_indexes | |
| .iter() | |
| .map(|&idx| file_metadata.row_group(idx)) | |
| .collect(); | |
| let sort_values = match if descending { | |
| converter.row_group_maxs(rg_metadata.iter().copied()) | |
| } else { | |
| converter.row_group_mins(rg_metadata.iter().copied()) | |
| } { | |
| Ok(vals) => vals, | |
| Err(e) => { | |
| debug!("Skipping RG reorder: cannot get min/max values: {e}"); | |
| return Ok(self); | |
| } | |
| }; | |
| // Sort indices by the statistics that best match the requested order | |
| let sort_options = arrow::compute::SortOptions { | |
| descending, | |
| nulls_first: first_sort_expr.options.nulls_first, | |
| }; | |
| let sorted_indices = match arrow::compute::sort_to_indices( | |
| &sort_values, |
There was a problem hiding this comment.
Yes, this is a good point.
| let sorted_indices = match arrow::compute::sort_to_indices( | ||
| &min_values, | ||
| Some(sort_options), | ||
| None, | ||
| ) { |
There was a problem hiding this comment.
If multiple row groups share the same min (or max) statistic, sort_to_indices may not guarantee a deterministic/stable tie-breaker across platforms/versions. Since row-group order can affect scan reproducibility and performance debugging, consider adding a stable secondary key (e.g., original row group index) when statistics are equal.
| /// - 0 or 1 row groups (nothing to reorder) | ||
| /// - Sort expression is not a simple column reference | ||
| /// - Statistics are unavailable | ||
| pub(crate) fn reorder_by_statistics( |
There was a problem hiding this comment.
I think @adriangb had the great idea to also order by grouping keys which can
- reduce cardinality within partitions (partition-local state can be smaller)
- allow for better cache locality (row groups with more equal keys are grouped together)
Doesn't have to be in this PR but perhaps we can think about how it fits in.
There was a problem hiding this comment.
Thanks @Dandandan for review! That's a great extension. The reorder_by_statistics method is generic enough to take any LexOrdering — it doesn't need to be tied to TopK specifically. So extending this for GROUP BY should be a matter of:
- Computing a preferred RG ordering from grouping keys in the aggregate planner
- Passing it through to ParquetSource::sort_order_for_reorder
Happy to track this as a follow-up issue. Will open one after this PR lands.
There was a problem hiding this comment.
Thanks @Dandandan! Created #21581 to track this. The existing infrastructure from this PR should be directly reusable — mainly needs the aggregate planner to populate sort_order_for_reorder from grouping keys.
|
run benchmarks |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing feat/reorder-row-groups-by-stats (5018882) to 29c5dd5 (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing feat/reorder-row-groups-by-stats (5018882) to 29c5dd5 (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing feat/reorder-row-groups-by-stats (5018882) to 29c5dd5 (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
run benchmark clickbench_partitioned clickbench_extended |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing feat/reorder-row-groups-by-stats (5018882) to 29c5dd5 (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing feat/reorder-row-groups-by-stats (5018882) to 29c5dd5 (merge-base) diff using: clickbench_extended File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
I wonder if the ordering should be done before the files / row groups are assigned to partitions? So they are more globally sorted instead of just per partition? It seems now they are sorted within each partition, which should help, but perhaps not nearly as much as it would be if all the partitions contain the optimal row groups? This would also help in the case of #21581 |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_extended — base (merge-base)
clickbench_extended — branch
File an issue against this benchmark runner |
Great point @Dandandan — you're right that global reorder is much more effective than per-partition reorder. With global reorder + round-robin distribution, each partition's first RG is close to the global optimum, so:
The current per-partition reorder is limited because even after sorting, partition 0's "best" RG might be much worse than the global best (which may have landed in partition 2). Moving to global reorder would require changes at the planning / EnforceDistribution layer to load RG statistics and redistribute RGs across partitions. I'd prefer to keep this PR as an incremental step (per-partition) and track global reorder as a follow-up — it would benefit both #21317 and #21581. Does this make sense? |
For overlapping row group ranges, sorting by min for DESC can pick a worse first RG. Example: RG0(50-60) vs RG1(40-100) — min DESC picks RG0 first (max=60), but RG1 contains the largest values (max=100). Use min for ASC and max for DESC to correctly prioritize the row group most likely to contain the optimal values for TopK.
Sure, makes sense. |
Which issue does this PR close?
Closes #21317
Rationale for this change
When sort pushdown is active for Inexact paths (e.g., reverse scan or statistics-based reordering that can't be elevated to Exact), row groups within a file are currently read in their original order. This means TopK queries may read suboptimal row groups first, delaying threshold tightening.
By reordering row groups by their min/max statistics to match the requested sort order, TopK finds optimal values first. This gives two levels of benefit:
Row-level filtering (immediate): TopK sets a tight dynamic filter threshold after reading the first (best) row group. For subsequent row groups, the dynamic filter acts as a row-level filter — the parquet reader uses page index to skip non-matching pages and avoids decoding non-sort columns for filtered rows. Significant I/O reduction for wide tables.
Row-group-level skipping (follow-up with morsel API): Once morsel-driven scanning lands, the dynamic filter can be re-evaluated between row groups, skipping entire row groups when their min statistics exceed the TopK threshold.
What changes are included in this PR?
reorder_by_statisticstoPreparedAccessPlanthat sortsrow_group_indexesby the first sort column's statistics (min for ASC, max for DESC)sort_order_for_reorder: Option<LexOrdering>field toParquetMorselizerandParquetSourceParquetSource::try_pushdown_sort(Inexact path) through to the openerrow_selectionis present (remapping is complex)Are these changes tested?
Yes. Added 9 unit tests in
access_plan.rscovering:row_selectionpresentcol + 1)SLT tests in
sort_pushdown.sltverify end-to-end correctness with multi-RG files.Benchmark
Note: The existing
sort_pushdown_sortedbenchmark only covers the Exact path (sort elimination). The performance benefit of this PR (Inexact path with RG reorder) is not captured by existing benchmarks. A follow-up benchmark for the Inexact path is tracked in #21582 and will be added separately to demonstrate the optimization impact.Are there any user-facing changes?
No API changes. Users benefit from improved TopK performance on queries like
SELECT * FROM t ORDER BY col LIMIT Nwhen: